package io.github.talelin.latticy.module.message;

import com.alibaba.fastjson.JSONObject;

import io.github.talelin.latticy.common.util.ConvertGson;
import io.github.talelin.latticy.common.util.TextUtil;
import io.github.talelin.latticy.model.DeviceDO;
import io.github.talelin.latticy.model.InsAccountInfoDO;
import io.github.talelin.latticy.model.InsSendUserInfoDO;
import io.github.talelin.latticy.model.UserDO;
import io.github.talelin.latticy.service.DeviceService;
import io.github.talelin.latticy.service.GroupService;
import io.github.talelin.latticy.service.InsAccountInfoService;
import io.github.talelin.latticy.service.InsSendUserInfoService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author pedro@TaleLin
 * @author Juzi@TaleLin
 * websocket实现类
 */
@Slf4j
@Service
public class WsHandlerImpl implements WsHandler {
    
    @Autowired
    private DeviceService deviceService;
    
    @Autowired
    private InsSendUserInfoService sendUserInfoService;
    
    @Autowired
    private InsAccountInfoService accountService;
    
    private final AtomicInteger connectionCount = new AtomicInteger(0);
    
    private CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
    
    @Autowired
    private GroupService groupService;
    
    private boolean isHeart;
    
    @Override
    public void handleOpen(WebSocketSession session) {
        log.info("a new connection opened");
        //设备上线
        String deviceId = session.getUri().getQuery();
        for (WebSocketSession oldSession : sessions) {
            if (oldSession.getUri().getQuery().equals(deviceId)) {
                sessions.remove(oldSession);
                connectionCount.decrementAndGet();
            }
        }
        
        int cnt = connectionCount.incrementAndGet();
        log.info("a new connection opened，current online count：{}", cnt);
        
//        DeviceDO device = deviceService.getByDeviceId(deviceId);
//        if (device != null) {
//            device.setOnline(1);
//            if(device.getTaskStatus()==1){
//                device.setTaskStatus(2);
//            }
//            deviceService.updateDevice(device);
//        }
        WebSocketEntity webSocketEntity = new WebSocketEntity();
        mWebsocketMap.put(deviceId, webSocketEntity);
        sessions.add(session);
        
        if (!isHeart) {
            
            isHeart = true;
            startHeart();
        }
    }
    
    
    @Override
    public void handleClose(WebSocketSession session) {
        int cnt = connectionCount.decrementAndGet();
        log.info("a connection closed，current online count：{}", cnt);
        //设备下线
        String deviceId = session.getUri().getQuery();
        DeviceDO device = deviceService.getByDeviceId(deviceId);
        if (device != null) {
            device.setOnline(0);
            deviceService.updateDevice(device);
        }
        mWebsocketMap.remove(deviceId);
        sessions.remove(session);
    }
    
    @Override
    public void handleMessage(WebSocketSession session, String message) {
        log.info("收到消息：{}", message);
        // 只处理前端传来的文本消息，并且直接丢弃了客户端传来的消息
        JSONObject jsonTo = JSONObject.parseObject(message);
        String action = (String) jsonTo.get("action");
        String deviceId = (String) jsonTo.getString("deviceId");
        String deviceTag = (String) jsonTo.getString("deviceTag");
        String seqId = (String) jsonTo.getString("seqId");
        String req = (String) jsonTo.getString("req");
        JSONObject jsonReq;
        String account = null;
        String sendUsername = null;
        
        Response response = new Response();
        DeviceDO device = null;
        switch (action) {
            case "bind":
                response.setSeqId(seqId);
                response.setAction("bind");
                response.setType("2");
                try {
                    sendMessage(session, ConvertGson.toJson(response));
                } catch (IOException e) {
                    e.printStackTrace();
                }
                
                if (TextUtil.isEmpty(deviceId)) {
                    break;
                } else {
                    device = deviceService.getByDeviceId(deviceId);
                    if (device != null) {
                        device.setDeviceTag(deviceTag);
                        if (device.getTaskStatus() == 1) {
                            device.setTaskStatus(2);
                        }
                        device.setOnline(1);
                        deviceService.updateDevice(device);
                    } else {
                        device = new DeviceDO();
                        device.setDeviceId(deviceId);
                        device.setDeviceTag(deviceTag);
                        deviceService.createDevice(device);
                    }
                    
                }
                break;
            case "send_finish":
                response.setSeqId(seqId);
                response.setAction("send_finish");
                response.setType("2");
                try {
                    sendMessage(session, ConvertGson.toJson(response));
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (TextUtil.isEmpty(deviceId)) {
                    break;
                } else {
                    //1 密码错误 ，2账号冻结 3，操作频繁,4,发送对象异常
                    String type = null;
                    if (!TextUtil.isEmpty(req)) {
                        jsonReq = JSONObject.parseObject(req);
                        account = jsonReq.getString("account");
                        sendUsername = jsonReq.getString("sendUsername");
                        type = jsonReq.getString("type");
                    }
                    //ins账号更新状态与次数
                    InsAccountInfoDO insAccountInfo = accountService.selectByUsername(account);
                    if(insAccountInfo!=null){
    
                        switch (type) {
                            case "1":
                                //密码错误
                                insAccountInfo.setStatus(InsAccountInfoDO.STATUS_PWD_ERROR);
                                break;
                            case "2":
                                //账号冻结
                                insAccountInfo.setStatus(InsAccountInfoDO.STATUS_ERROR);
                                break;
                            case "3":
                                //操作频繁
                                insAccountInfo.setStatus(InsAccountInfoDO.STATUS_EXCEPTION);
                                break;
                            case "4":
                                //4,任务超时
                                insAccountInfo.setStatus(InsAccountInfoDO.STATUS_NORMAL);
                                break;
                            default:
                                //发送成功
                                insAccountInfo.setCount(insAccountInfo.getCount() + 1);
                                insAccountInfo.setStatus(InsAccountInfoDO.STATUS_NORMAL);
                                break;
                        }
    
                        accountService.updateInsAccountInfo(insAccountInfo);
                    }
                    
                    //接收账号更新状态
                    InsSendUserInfoDO insSendUserInfo = sendUserInfoService.selectByUsername(sendUsername);
                    //发送成功
                    if (TextUtil.equals(type, "0")) {
                        insSendUserInfo.setStatus(2);
                    }else if(TextUtil.equals(type,"4")){
                        insSendUserInfo.setStatus(4);
                    } else {
                        insSendUserInfo.setStatus(0);
                    }
                    sendUserInfoService.updateInsAccountStatus(insSendUserInfo);
                    
                    device = deviceService.getByDeviceId(deviceId);
                    if(device!=null){
                        if(TextUtil.equals(type, "0")){
                            device.setSendSuccessCount(device.getSendSuccessCount()+1);
                        }
                        //任务进行中改成任务等待中
                        if (device.getTaskStatus() == 1) {
                            device.setTaskStatus(2);
                        }
                        deviceService.updateDevice(device);
                    }
                    
                    
                }
                break;
            case "heartbeat":
                synchronized (mWebsocketMap) {
                    WebSocketEntity webSocketEntity = mWebsocketMap.get(deviceId);
                    webSocketEntity.setSendCount(0);
                    webSocketEntity.setLastTime(System.currentTimeMillis());
                }
                break;
            default:
                break;
        }
    }
    
    
    @Override
    public void sendMessage(WebSocketSession session, String message) throws IOException {
        log.error("发送消息{}：{}", session.getUri(), message);
        this.sendMessage(session, new TextMessage(message));
    }
    
    
    @Override
    public void sendMessage(Integer userId, TextMessage message) throws IOException {
        Optional<WebSocketSession> userSession = sessions.stream().filter(session -> {
            if (!session.isOpen()) {
                return false;
            }
            Map<String, Object> attributes = session.getAttributes();
            if (!attributes.containsKey(MessageConstant.USER_KEY)) {
                return false;
            }
            UserDO user = (UserDO) attributes.get(MessageConstant.USER_KEY);
            return user.getId().equals(userId);
        }).findFirst();
        if (userSession.isPresent()) {
            userSession.get().sendMessage(message);
        }
    }
    
    @Override
    public void sendMessage(Integer userId, String message) throws IOException {
        sendMessage(userId, new TextMessage(message));
    }
    
    @Override
    public void sendMessage(WebSocketSession session, TextMessage message) throws IOException {
        session.sendMessage(message);
    }
    
    @Override
    public void broadCast(String message) throws IOException {
        for (WebSocketSession session : sessions) {
            if (!session.isOpen()) {
                continue;
            }
            sendMessage(session, message);
        }
    }
    
    @Override
    public void broadCast(TextMessage message) throws IOException {
        for (WebSocketSession session : sessions) {
            if (!session.isOpen()) {
                continue;
            }
            session.sendMessage(message);
        }
    }
    
    @Override
    public void broadCastToGroup(Integer groupId, String message) throws IOException {
        this.broadCastToGroup(groupId, new TextMessage(message));
    }
    
    @Override
    public void broadCastToGroup(Integer groupId, TextMessage message) throws IOException {
        List<Integer> userIds = groupService.getGroupUserIds(groupId);
        for (WebSocketSession session : sessions) {
            if (!session.isOpen()) {
                continue;
            }
            Map<String, Object> attributes = session.getAttributes();
            if (!attributes.containsKey(MessageConstant.USER_KEY)) {
                continue;
            }
            UserDO user = (UserDO) attributes.get(MessageConstant.USER_KEY);
            boolean matched = userIds.stream().anyMatch(id -> id.equals(user.getId()));
            if (!matched) {
                continue;
            }
            session.sendMessage(message);
        }
    }
    
    @Override
    public void handleError(WebSocketSession session, Throwable error) {
        log.error("websocket error：{}，session id： {}", error.getMessage(), session.getId());
        log.error("", error);
        this.handleClose(session);
    }
    
    @Override
    public CopyOnWriteArraySet<WebSocketSession> getSessions() {
        return sessions;
    }
    
    public WebSocketSession getWebSocketSession(String deviceId) {
        for (WebSocketSession session : sessions) {
            if (session.getUri().getQuery().equals(deviceId)) {
                return session;
            }
        }
        return null;
    }
    
    @Override
    public int getConnectionCount() {
        return connectionCount.get();
    }
    
    
    private static final ConcurrentHashMap<String, WebSocketEntity> mWebsocketMap = new ConcurrentHashMap<>();
    
    /**
     * @param
     * @return void
     * @throws
     * @Description: 启动心跳包
     * @author zhangran
     * @date 2022年7月13日
     */
    private synchronized void startHeart() {
        ExamineHeartThread examineHeart = new ExamineHeartThread();
        Thread examineThread = new Thread(examineHeart);
        
        KeepHeartThread keepHeart = new KeepHeartThread();
        Thread keepThread = new Thread(keepHeart);
        
        
        keepThread.start();
        examineThread.start();
        
    }
    
    
    /**
     * @param @param  message
     * @param @throws IOException
     * @return void
     * @throws
     * @Description: 服务端群发消息
     * @author zhangran
     * @date 2022年7月13日
     */
    public synchronized void sendPing(String message) throws IOException {
        if (sessions.size() <= 0)
            return;
        String deviceId;
        synchronized (mWebsocketMap) {
            WebSocketEntity webSocketEntity;
            for (WebSocketSession webSocketSession : sessions) {
                deviceId = webSocketSession.getUri().getQuery();
                synchronized (mWebsocketMap) {
                    webSocketEntity = mWebsocketMap.get(deviceId);
                    if (webSocketEntity != null) {
                        if (webSocketEntity.getSendCount() < 1) {
                            webSocketEntity.setLastTime(System.currentTimeMillis());
                        }
                        webSocketEntity.setSendCount(webSocketEntity.getSendCount() + 1);
                        sendMessage(webSocketSession, message);
                    }
                }
                
            }
        }
        
    }
    
    /**
     * @author zhangran
     * @Description server发送心跳包 10秒一次
     */
    private class KeepHeartThread implements Runnable {
        
        @SneakyThrows
        @Override
        public void run() {
            HashMap hashMap = new HashMap();
            hashMap.put("action", "heartbeat");
            while (true) {
                try {
                    log.info("发送心跳包当前人数为:" + sessions.size());
                    sendPing(ConvertGson.toJson(hashMap));
                    Thread.sleep(10000);
                } catch (Exception e) {
                    log.error("发送心跳包异常：", e);
                }
            }
        }
        
    }
    
    /**
     * @author zhangran
     * @Description 检测是否收到client心跳 每秒一次
     */
    private class ExamineHeartThread implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    long timeMillions = System.currentTimeMillis();
                    String deviceId;
                    WebSocketEntity webSocketEntity;
                    synchronized (mWebsocketMap) {
                        for (WebSocketSession webSocketSession : sessions) {
                            deviceId = webSocketSession.getUri().getQuery();
                            webSocketEntity = mWebsocketMap.get(deviceId);
                            if (webSocketEntity != null) {
                                if (webSocketEntity.getSendCount() > 3 && webSocketEntity.getLastTime() != 0 && ((timeMillions - webSocketEntity.getLastTime()) > 30000)) {
                                    try {
                                        log.error("没收到心跳包：", deviceId);
                                        webSocketSession.close();
                                        handleClose(webSocketSession);
                                    } catch (Exception e) {
                                        log.error("检测心跳异常：", e);
                                        throw new RuntimeException(e);
                                    }
                                }
                            }
                            
                        }
                    }
                    
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.error("检测心跳异常：", e);
                }
            }
        }
    }
}
